In [1]:
import math
import pickle as p
import tensorflow as tf
import numpy as np
import json
In [2]:
n_input_nodes = 2
n_output_nodes = 1
In [3]:
x = tf.placeholder(tf.float32, (None, n_input_nodes))
W = tf.Variable(tf.ones((n_input_nodes, n_output_nodes)), dtype=tf.float32)
b = tf.Variable(tf.zeros(n_output_nodes), dtype=tf.float32)
In [4]:
z = tf.matmul(x, W) + b
out = tf.sigmoid(z)
In [5]:
test_input = [[0.5, 0.5]]
with tf.Session() as session:
init = tf.global_variables_initializer()
session.run(init)
feed_dict = {x: test_input}
output = session.run([out], feed_dict=feed_dict)
print(output[0])
see https://github.com/nicholaslocascio/bcs-lstm/blob/master/Lab.ipynb
Sentiment classification will be done based on words, not on characters!
In [6]:
# set variables
tweet_size = 20
hidden_size = 100
vocab_size = 7597 # amount of words in our vocabulary
batch_size = 64
In [7]:
# this just makes sure that all our following operations will be placed in the right graph.
tf.reset_default_graph()
# create a session variable that we can run later.
session = tf.Session()
In [8]:
# batch_size x tweet_size (each word in tweet) x one_hot_vector of size vocab_size
tweets = tf.placeholder(dtype=tf.float32, shape=[None, tweet_size, vocab_size])
# 1d vector of size batch_size as we predict one value per tweet in batch
labels = tf.placeholder(dtype=tf.float32, shape=[None])
We want to feed the input sequence, word by word, into an LSTM layer, or multiple LSTM layers (we could also call this an LSTM encoder). At each "timestep", we feed in the next word, and the LSTM updates its cell state. The final LSTM cell state can then be fed through a final classification layer(s) to get our sentiment prediction.
In [9]:
# create 2 LSTM cells -> creates a layer of LSTM cells not just a single one
lstm_cell_1 = tf.contrib.rnn.LSTMCell(hidden_size)
lstm_cell_2 = tf.contrib.rnn.LSTMCell(hidden_size)
# create multiple LSTM layers by wrapping the two lstm cells in MultiRNNCell
multi_lstm_cells = tf.contrib.rnn.MultiRNNCell([lstm_cell_1, lstm_cell_2], state_is_tuple=True)
# define operation that runs LSTM graph across time, on the data
_, final_state = tf.nn.dynamic_rnn(multi_lstm_cells, tweets, dtype=tf.float32)
In [10]:
# function to create a weight matrix + bias parameters and matrix multiplication
def linear(input_, output_size, name, init_bias=0.0):
shape = input_.get_shape().as_list()
with tf.variable_scope(name):
W = tf.get_variable(
name='weights',
shape=[shape[-1], output_size],
dtype=tf.float32,
initializer=tf.random_normal_initializer(
stddev=1.0 / math.sqrt(shape[-1])))
if init_bias is None:
return tf.matmul(input_, W)
with tf.variable_scope(name):
b = tf.get_variable(
name='bias',
shape=[output_size],
initializer=tf.constant_initializer(init_bias))
return tf.matmul(input_, W) + b
In [11]:
'''
{Quick note that we need to feed in final_state[-1][-1] into linear since
final_state is actually a tuple consisting of the cell state
(used internally for the cell to keep track of things)
as well as the hidden state (the output of the cell), and one of these
tuples for each layer. We want the hidden state for the last layer, so we use
final_state[-1][-1]}''';
# pass final state into linear function to get output
sentiment = linear(final_state[-1][-1], 1, 'output')
In [12]:
# define loss (cross-entropy) -> output of classfication layer (logit) needs to be transformed to probability in [0,1] -> use sigmoid
sentiment = tf.squeeze(sentiment, [1])
# gives loss for each example in batch
loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=sentiment, labels=labels)
# take mean of all losses
loss = tf.reduce_mean(loss)
# round probilities to get 1 or 0 classfication
prob = tf.nn.sigmoid(sentiment)
prediction = tf.to_float(tf.greater_equal(prob, 0.5))
# calculate sum of errors based on which predictions were actually correct
pred_err = tf.to_float(tf.not_equal(prediction, labels))
pred_err = tf.reduce_sum(pred_err)
In [13]:
# train model - define optimizer (adam)
optimizer = tf.train.AdamOptimizer().minimize(loss)
In [14]:
# initialize variables
tf.global_variables_initializer().run(session=session)
In [25]:
def one_hot(raw_data, vocab_size):
data = np.zeros((len(raw_data), 20, vocab_size))
for tweet_index in range(len(raw_data)):
tweet = raw_data[tweet_index]
for word_index in range(20):
word_id = tweet[word_index]
data[tweet_index, word_index, word_id] = 1
return data
In [26]:
# load data and separate into tweets and labels
train_data = json.load(open('trainTweets_preprocessed.json', 'r'))
train_data = list(
map(lambda row: (np.array(row[0], dtype=np.int32), str(row[1])),
train_data))
train_tweets = np.array([t[0] for t in train_data])
train_labels = np.array([int(t[1]) for t in train_data])
test_data = json.load(open('testTweets_preprocessed.json', 'r'))
test_data = list(
map(lambda row: (np.array(row[0], dtype=np.int32), str(row[1])),
test_data))
print(train_tweets[:5])
print(train_labels[:5])
# we are just taking the first 1000 things from the test set for faster evaluation
test_data = test_data[0:1000]
test_tweets = np.array([t[0] for t in test_data])
one_hot_test_tweets = one_hot(test_tweets, vocab_size)
test_labels = np.array([int(t[1]) for t in test_data])
In [21]:
# we'll train with batches of size 64. This means that we run
# our model on 64 examples and then do gradient descent based on the loss
# over those 64 examples.
num_steps = 1000
In [27]:
for step in range(num_steps):
# get data for a batch
offset = (step * batch_size) % (len(train_data) - batch_size)
batch_tweets = one_hot(train_tweets[offset:(offset + batch_size)],
vocab_size)
batch_labels = train_labels[offset:(offset + batch_size)]
# put this data into a dictionary that we feed in when we run
# the graph. this data fills in the placeholders we made in the graph.
data = {tweets: batch_tweets, labels: batch_labels}
# run the 'optimizer', 'loss', and 'pred_err' operations in the graph
_, loss_value_train, error_value_train = session.run(
[optimizer, loss, pred_err], feed_dict=data)
# print stuff every 50 steps to see how we are doing
if (step % 50 == 0):
print("Minibatch train loss at step", step, ":", loss_value_train)
print("Minibatch train error: %.3f%%" % error_value_train)
# get test evaluation
test_loss = []
test_error = []
for batch_num in range(int(len(test_data) / batch_size)):
test_offset = (batch_num * batch_size) % (
len(test_data) - batch_size)
test_batch_tweets = one_hot_test_tweets[test_offset:(
test_offset + batch_size)]
test_batch_labels = test_labels[test_offset:(
test_offset + batch_size)]
data_testing = {
tweets: test_batch_tweets,
labels: test_batch_labels
}
loss_value_test, error_value_test = session.run(
[loss, pred_err], feed_dict=data_testing)
test_loss.append(loss_value_test)
test_error.append(error_value_test)
print("Test loss: %.3f" % np.mean(test_loss))
print("Test error: %.3f%%" % np.mean(test_error))
In [ ]: